Spark for High-throughput, Scalable, Quantitative Analysis of Genome-Scale Datasets

Kevin Mader and Marco Stampanoni
27 May 2014, Zurich Machine Learning Meetup

Paul Scherrer Institut ETH Zurich 4Quant

Outline

  • Introduction
  • Motivation
  • Scaling Imaging
  • Scaling Post-processing
  • Beyond

Original goal

Understanding the relationship between genetic background and the microstructure of bone

Internal Structures

Introduction

  • A phenotype is a very general term for any measurable characteristic resulting from a combination of genetic background and environmental conditions
    • All fields of genetics are ultimately dependant on phenotypes since they the observable portion
  • Structural phenotypes are related to position, size, shape, and organization of different cells, tissues, and organs

Bone Structure

What is genome-scale?

  • Many points, involves looking at 100s to 10000s+ of samples
    • hundreds to thousands of pieces of genomic information for each sample
  • Reproducible, every sample needs to be handled identically to avoid biasing the results
  • Flexible!
    • many trends are not obvious
    • data needs to be explored
    • look for connections to human genome and between genes

Machine Learning: Goals

One way of looking at machine learning problems is to understand the relationship between multiple inputs and multiple outputs so that outputs can be predicted from inputs

  • Voice Recognition

    • inputs: sound waves
    • outputs: words
  • Search Optimization

    • input: search query
    • output: best result
    • prediction: better sorted results

Prediction

Once the system is understood different inputs can be entered and the output predicted. A good algorithm will then reliably provide a consistent meaningful answer

  • Voice Recognition prediction: unknown sounds to words
  • Search prediction: better sorted results or suggestions

Genomics as Machine Learning

  • input
    • genetic background (genotype markers ~ randomly switched)
    • environmental factors ( largely out of our control )
  • output
    • phenotypes: (cell count, shape, distribution, thickness)
  • prediction
    • identifying the actions of single genes
    • identifying human genes which might have similar actions

Genotypes

Mappings

Phenotyping

Phenotype Pipeline

Synchrotron-based X-Ray Tomographic Microscopy

The only technique which can do all

  • peer deep into large samples
  • achieve \( \mathbf{<1\mu m} \) isotropic spatial resolution
    • with 1.8mm field of view
  • achieve >10 Hz temporal resolution
  • 8GB/s of images

SLS and TOMCAT

[1] Mokso et al., J. Phys. D, 46(49),2013

Cellular and Canal Structures in Murine Cortical Bone

So we have the imaging modality, now what?

Time Consumption

Just like genetics and many other areas, the burden of imaging is moving from acquisition to post-processing

Time Consumption

Adapted from: Sboner A,et. al. Genome Biology, 2011

Imaging Motivation

There are three different types of problems that we will run into.

Really big data sets

  • Several copies of the dataset need to be in memory for processing
  • Computers with more 256GB are expensive and difficult to find
  • Even they have 16 cores so still 16GB per CPU
  • Drive speed / network file access becomes a limiting factor
  • If it crashes you lose everything
    • or you have to manually write a bunch of mess check-pointing code

Many datasets

  • For genome-scale studies 1000s of samples need to be analyzed identically
  • Dynamic experiments can have hundreds of measurements
  • Animal phenotyping can have many huge data-sets (1000s of 328GB datasets)

Exploratory Studies

  • Not sure what we are looking for
  • Easy to develop new analyses
  • Quick to test hypotheses

What about existing tools?

Positives

  • Friendly GUI
  • Optimized: Use Vectorized code and GPUs
  • Scale well up to 1024 x 1024 x 1024 \( \approx \) 1 GVx.

Negatives

  • if one machine crashes, everything crashes (some tools are very instable)
  • Tedious to scale to many datasets
  • Arrays cannot handle multichannel data
  • implementing new algorithms is very difficult
  • extracting specific metrics (shape as a function of distance, etc) usually not possible
  • checkpoints must be done manually

Killers

  • You don't ask the questions you should, you ask the questions you can
  • None of these approaches are robust or deal with the data flood
  • 8 GB/s is more data than Facebook processes a day and 3 times as many images as Instagram (and they are all 5Mpx 14-bit and we can't compress them)

Scaling up Challenges

Parallel

Coordination

Parallel computing requires a significant of coordinating between computers for non-easily parallelizable tasks.

Mutability

If you have two cores / computers trying to write the same information at the same it is no longer deterministic (not good)

Blocking

Taking turns and waiting for every independent process to run completely negates the benefits of parallel computing

Distributed

Sending Instructions / Data Afar

Fault Tolerance

If you have 1000 computers working on solving a problem and one fails, you do not want your whole job to crash

Data Storage

How can you access and process data from many different computers quickly without very expensive infrastructure

More Big Challenges: Implementation

Each parallel library / tool requires different tools with different constraints CUDA, OpenCL, OpenMPI, Matlab Distributed Toolbox, KORUS, .NET Concurrent Framework, Python Multiprocessing, Hadoop, Storm, Impala, Redshift

  • Learn many very specific code bases, and managing the logistics of organizing.
  • Almost none are fault-tolerant out of the box.
  • Lock in: Transitioning from one code-base to another is at best nightmarish
  • Testing and profiling code locally can be very difficult
  • None of them are flexible allowing extra-processing power to “jump” in intense complications when it is needed

Solution: Spark / Resilient Distributed Datasets

Technical Specifications

  • Developed by the AMP Lab at UC Berkeley in 2012
  • General tool for all Directed Acyclical Graph (DAG) workflows
  • Course-grained processing \( \rightarrow \) simple operations applied to entire sets
    • Map, reduce, join, group by, fold, foreach, filter,…
  • In-memory caching
  • Based on Scala, offers interactive REPL

Apache Spark

Zaharia, M., et. al (2012). Resilient distributed datasets: a fault-tolerant abstraction for in-memory cluster computing

Practical Specification

  • Distributed, parallel computing without logistics, libraries, or compiling
  • Declarative rather than imperative
    • Apply operation \( f \) to each image / block
    • Even scheduling is handled automatically
  • Input and results can be stored in memory, on disk, in the cloud, using Hadoop, redundantly or not
  • Functional, unit, integration tests can be performed locally

Rich, heavily developed platform

Available Tools

Tools built for table-like data data structures and much better adapted to it.

Commercial Support

Dozens of major companies (Apple, Google, Facebook, Cisco, …) donate over $30M a year to development of Spark and the Berkeley Data Analytics Stack

  • 2 startups in the last 6 months with seed-funding in excess of $15M each

Academic Support

  • All source code is available on GitHub
    • Elegant
  • No patents or restrictions on usage
  • Machine Learning Course in D-INFK next semester based on Spark

Spark: Imaging Layer / TIPL

Developed at 4Quant, ETH Zurich, and Paul Scherrer Institut

  • SIL is a flexible framework for image processing and testing

    • modular design
    • verification and testing
    • reproduction of results from parameters
    • cluster integration
  • Report Generation

    • parametric assessment
    • pipelined analysis
    • Rendering subregions for movies
    • Displaying multidimensional data efficiently

TIPL

Post-processing: Statistical Analysis

  • Exploring phenotypes of 55 million cells.
  • Our current approach has been either
  • Group and divide then average the results.
    • Average Cell Volume, etc.
    • Average density or connectivity
  • Detailed analysis of individual samples
    • K-means clustering for different regions in bone / layers
    • Principal component analysis for phenotypes

Standard problem: asking the questions you can (which are easy), rather than the questions you should

Phenotype Within Between Ratio (%)
Length 36.97 4.279 864.1
Width 27.92 4.734 589.9
Height 25.15 4.636 542.5
Volume 67.85 12.479 543.7
Nearest Canal Distance 70.35 333.395 21.1
Density (Lc.Te.V) 144.40 27.658 522.1
Nearest Neighbors (Lc.DN) 31.86 1.835 1736.1
Stretch (Lc.St) 13.98 2.360 592.5
Oblateness (Lc.Ob) 141.27 18.465 765.1

The results in the table show the within and between sample variation for selected phenotypes in the first two columns and the ratio of the within and between sample numbers

Visualizing the Variation

How does this result look visually? Each line shows the mean \( \pm \) standard deviation for sample. The range within a single sample is clearly much larger than between Partial Volume Effect

Reducing Variability

Instead of taking the standard metrics, we can search for

  • linear combinations (principal component analysis)
  • groups (k-means)

within the 65 million samples based on a number of different phenotypes to reduce the variation within single samples and

  • highlight the effect of genetic differences
  • instead of spatial variation.

Partial Volume Effect

Spark Imaging Performance

Real Performance

Overhead Costs

Spark Numerical Performance

With the ability to scale to millions of samples there is no need to condense. We can analyze the entire dataset in real-time and try and identify groups or trends within the whole data instead of just precalculated views of it.

Practical

1276 comma separated text files with 56 columns of data and 15-60K rows

Task Single Core Time Spark Time (40 cores)
Load and Preprocess 360 minutes 10 minutes
Single Column Average 4.6s 400ms
1 K-means Iteration 2 minutes 1s

Can iteratively explore and hypothesis test with data quickly

We found several composite phenotypes which are more consistent within samples than between samples

Quo Vadis: Realtime Image Processing?

Needs

  • Scale up to 1000GVx samples (eventually)
  • Analyze standard measurements 14GVx regularly in a day
  • Analyze as fast as we usually measure 15 minutes / scan

Would be nice

  • Analyze scans as fast as we measure now (1 scan / minute)
  • Analyze scans as fast as we can measure (10 scans / minute)

Tomcat Goals

Tomcat Goals

Beyond: Spark Streaming for Acquisition

Before

  • Spec Macro to control acquisition
  • C program to copy data to network drive
  • Python-wrapped C program to locally create sinograms
  • Highly optimized C program called from Python script through PHP website
  • Bash scripts to copy data
  • DCL Scripts to format data properly
  • Proprietary image processing tools (with clicking)
  • Output statistics with Excel

With Spark

  • Spec Macro to control acquisition
  • Data into a ZeroMQ pipe
  • Spark Streaming on ZeroMQ pipe

plot of chunk unnamed-chunk-9

Beyond: Approximate Results

Extensions of Spark out of AMPLab like BlinkDB are moving towards approximate results.

  • Instead of mean(volume)
    • mean(volume).within_time(5)
    • mean(volume).within_ci(0.95)

For real-time image processing, with tasks like component labeling and filtering it could work well

  • are all 27 neighbor voxels needed to get a good idea of the result?
  • must every component be analyzed to get an idea about the shape distribution?

It provides a much more robust solution than taking a small region of interest or scaling down

What sort of projects demand this?

Zebra fish Phenotyping Project

Collaboration with Keith Cheng, Ian Foster, Xuying Xin, Patrick La Raviere, Steve Wang

1000s of samples of full adult animals, imaged at 0.74 \( \mu m \) resolution: Images 11500 x 2800 x 628 \( \longrightarrow \) 20-40GVx / sample

  • Identification of single cells (no down-sampling)
  • Cell networks and connectivity
  • Classification of cell type
  • Registration with histology

Brain Project

Collaboration with Alberto Astolfo, Matthias Schneider, Bruno Weber, Marco Stampanoni

1 \( cm^3 \) scanned at 1 \( \mu m \) resolution: Images \( \longrightarrow \) 1000 GVx / sample

  • Registration separate scans together
  • Blood vessel structure and networks
  • Registration with fMRI, histology

Acknowledgements

  • AIT at PSI
  • TOMCAT Group Tomcat Group

We are interested in partnerships and collaborations

Learn more at

Spark Preview

Install Spark

cd /scratch
curl -o spark.tgz http://d3kbcqa49mib13.cloudfront.net/spark-0.9.1-bin-hadoop1.tgz
tar -xvf spark.tgz
cd spark-0.9.1-bin-hadoop1/

Starting Spark

Spin up your own cluster in an hour ~~ we only use it on one node acting as the master, scheduler, and worker, but normally it is run on different computers ~~

  • Start the Spark-Shell ./bin/spark-shell
  • Start Spark-python ./bin/pyspark
    • Write code in Python

Getting an image to Key-Value Format

library(jpeg)
in.img<-readJPEG("ext-figures/input_image.jpg")
kv.img<-im.to.df(in.img)
write.table(kv.img,"cell_colony.csv",row.names=F,col.names=F,sep=",")
kable(head(kv.img))
x y val
1 1 0.6275
2 1 0.7804
3 1 0.8863
4 1 0.8980
5 1 0.9098
6 1 0.9216

The key is position \( \langle x, y \rangle \) and value is the intensity \( val \)

Loading the data into Spark (Scala)

val rawImage=sc.textFile("cell_colony.csv")
val imgAsColumns=rawImage.map(_.split(","))
val imgAsKV=imgAsColumns.map(point => ((point(0).toInt,point(1).toInt),point(2).toDouble))
  • Count the number of pixels
imgAsKV.count
  • Get the first value
imgAsKV.take(1)
  • Sample 100 values from the data
imgAsKV.sample(true,0.1,0).collect

Perform a threshold

val threshVal=0.5
val labelImg=imgAsKV.filter(_._2<threshVal)
  • Runs on 1 core on your laptop or 1000 cores in the cloud or on ETH's Brutus Cluster.
  • If one computer crashes or disconnects it automatically continues on another one.
  • If one part of the computation is taking too long it will be sent to other computers to finish
  • If a computer runs out of memory it writes the remaining results to disk and continues running (graceful dropoff in performance )

Get Volume Fraction

100.0*labelImg.count/(imgAsKV.count)

Region of Interest

Take a region of interest between 0 and 100 in X and Y

def roiFun(pvec: ((Int,Int),Double)) = 
 {pvec._1._1>=0 & pvec._1._1<100 & // X
  pvec._1._2>=0 & pvec._1._2<100 } //Y
val roiImg=imgAsKV.filter(roiFun)

Perform a 3x3 box filter

def spread_voxels(pvec: ((Int,Int),Double), windSize: Int = 1) = {
  val wind=(-windSize to windSize)
  val pos=pvec._1
  val scalevalue=pvec._2/(wind.length*wind.length)
  for(x<-wind; y<-wind) 
    yield ((pos._1+x,pos._2+y),scalevalue)
}

val filtImg=roiImg.
      flatMap(cvec => spread_voxels(cvec)).
      filter(roiFun).reduceByKey(_ + _)

Setting up Component Labeling

  • Create the first labels from a thresheld image as a mutable type
val xWidth=100
var newLabels=labelImg.map(pvec => (pvec._1,(pvec._1._1.toLong*xWidth+pvec._1._2+1,true)))
  • Spreading to Neighbor Function
def spread_voxels(pvec: ((Int,Int),(Long,Boolean)), windSize: Int = 1) = {
  val wind=(-windSize to windSize)
  val pos=pvec._1
  val label=pvec._2._1
  for(x<-wind; y<-wind) 
    yield ((pos._1+x,pos._2+y),(label,(x==0 & y==0)))
}

Running Component Labeling

var groupList=Array((0L,0))
var running=true
var iterations=0
while (running) {
  newLabels=newLabels.
  flatMap(spread_voxels(_,1)).
    reduceByKey((a,b) => ((math.min(a._1,b._1),a._2 | b._2))).
    filter(_._2._2)
  // make a list of each label and how many voxels are in it
  val curGroupList=newLabels.map(pvec => (pvec._2._1,1)).
    reduceByKey(_ + _).sortByKey(true).collect
  // if the list isn't the same as before, continue running since we need to wait for swaps to stop
  running = (curGroupList.deep!=groupList.deep)
  groupList=curGroupList
  iterations+=1
  print("Iter #"+iterations+":"+groupList.mkString(","))
}
groupList

Calculating From Images

  • Average Voxel Count
val labelSize = newLabels.
  map(pvec => (pvec._2._1,1)).
  reduceByKey((a,b) => (a+b)).
  map(_._2)
labelSize.reduce((a,b) => (a+b))*1.0/labelSize.count
  • Center of Volume for Each Label
val labelPositions = newLabels.
  map(pvec => (pvec._2._1,pvec._1)).
  groupBy(_._1)
def posAvg(pvec: Seq[(Long,(Int,Int))]): (Double,Double) = {
val sumPt=pvec.map(_._2).reduce((a,b) => (a._1+b._1,a._2+b._2))
(sumPt._1*1.0/pvec.length,sumPt._2*1.0/pvec.length)
}
print(labelPositions.map(pvec=>posAvg(pvec._2)).mkString(","))

Lazy evaluation

  • No execution starts until you save the file or require output
  • Spark automatically deconstructs the pipeline and optimizes the jobs to run so computation is not wasted outside of the region of interest (even though we did it last)